package com.xl.competition.modul_b.task3

import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}

import java.util.Properties

/**
 * @author: xl
 * @createTime: 2023/11/18 15:45:35
 * @program: com.xl.competition
 * @description: ${description}
 */
object LoadUserContinueOrderToMysql {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession
      .builder()
      .appName(this.getClass.getName)
      .enableHiveSupport()
      .config("hive.metastore.uris", "thrift://node2:9083")
      .config("spark.sql.parquet.writeLegacyFormat", "true")
      .getOrCreate()

    val dataFrame: DataFrame = spark.sql(
      """
        |select userid,
        |       username,
        |       concat(befor,"_",create_time) day,
        |       all_amount + before_amount totalconsumption,
        |       totalorder
        |from (
        |         select user_id userid,
        |                username,
        |                create_time,
        |                lag(create_time, 1, "") over (partition by user_id order by create_time) befor,
        |                all_amount,
        |                lag(all_amount, 1, 99999999) over (partition by user_id order by create_time)   before_amount,
        |                sum(totalorder) over (partition by user_id order by create_time ROWS BETWEEN 1 PRECEDING and CURRENT ROW) totalorder
        |         from (
        |                  select t3.user_id,
        |                         create_time,
        |                         username,
        |                         sum(total_amount) all_amount,
        |                         count(id) totalorder
        |                  from dwd.fact_order_info foi
        |                            join (
        |                      SELECT t2.user_id,
        |                             username,
        |                             count(1) as login_times
        |                      FROM (
        |                               SELECT t1.user_id,
        |                                      t1.name                      username,
        |                                      t1.create_time,
        |                                      date_sub(create_time, rn) as diff_date
        |                               FROM (
        |                                        SELECT user_id,
        |                                               name,
        |                                               from_unixtime(unix_timestamp(foi1.create_time, 'yyyyMMdd'),
        |                                                             'yyyy-MM-dd')                                           create_time,
        |                                               row_number() over (partition by user_id order by foi1.create_time) as rn
        |                                        FROM dwd.fact_order_info foi1
        |                                                 join dim.dim_user_info
        |                                                      on user_id = dim_user_info.id
        |                                        group by user_id, name, foi1.create_time
        |                                    ) t1
        |                           ) t2
        |                      group by t2.user_id, username, t2.diff_date
        |                      having login_times >= 2
        |                  ) t3
        |                                      on foi.user_id = t3.user_id
        |                  group by t3.user_id, t3.username, create_time
        |                  order by user_id
        |              ) t4
        |     )t5
        |where all_amount > before_amount
        |""".stripMargin)

    val prop = new Properties()
    prop.put("user", "root")
    prop.put("password", "Abc123..")

    dataFrame.write
      .mode(SaveMode.Append)
      .jdbc("jdbc:mysql://node3:3306/shtd_result", "usercontinueorder", prop)


    spark.stop()
  }
}
